package com.ideal.manage.collection.controller;

import com.ideal.manage.collection.model.MSG;
import com.ideal.manage.collection.model.ReportData;
import com.ideal.manage.collection.utils.JsonUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.web.client.RestTemplate;


/**
 * Created by Administrator on 2019/2/28 0028.
 * 消费者线程,从kafka中消费消息，然后调用告警接口
 */
public class CustomerRunnable implements Runnable {

    private KafkaConsumer kafkaConsumer;

    private RestTemplate restTemplate;



    public CustomerRunnable(KafkaConsumer kafkaConsumer,RestTemplate restTemplate) {
        this.kafkaConsumer = kafkaConsumer;
        this.restTemplate = restTemplate;
    }

    @Override
    public void run() {

        /*while (true) {
            //消费消息
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            //处理消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();

                String jsonStr = record.value();
                ReportData reportData = (ReportData) JsonUtils.getDTO(jsonStr, ReportData.class);

                Map<String,String> map = new HashMap<>();
                map.put("equipId", String.valueOf(reportData.getEquipId()));
                map.put("thresholdId", String.valueOf(reportData.getThresholdId()));
                map.put("value", String.valueOf(reportData.getValue()));
                //告警信息消费数据
                HttpClientUtil.doPost("http://localhost:8016/minapp/alarm/generator/message_consumer",map);

            }
        }*/

        while (true) {
            //消费消息
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            //处理消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();

                String jsonStr = record.value();
                ReportData reportData = (ReportData) JsonUtils.getDTO(jsonStr, ReportData.class);
                HttpHeaders headers = new HttpHeaders();
                headers.setContentType(MediaType.APPLICATION_JSON);
                HttpEntity<ReportData> entity = new HttpEntity<ReportData>(reportData,headers);
                System.out.println("reportdata========"+reportData.toString());
                MSG msg = restTemplate.postForObject("http://localhost:8018/minapp/alarm/generator/message_consumer",entity,MSG.class);
                System.out.println("msg----------------");
            }
        }
    }
}
